/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hive.ql;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.hadoop.hive.common.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.MetaStoreFilterHook;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

import static java.util.Arrays.asList;
import static org.apache.commons.collections.CollectionUtils.isEqualCollection;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN;

/**
 * The LockManager is not ready, but for no-concurrency straight-line path we can
 * test AC=true, and AC=false with commit/rollback/exception and test resulting data.
 *
 * Can also test, calling commit in AC=true mode, etc, toggling AC...
 *
 * Tests here are for multi-statement transactions (WIP) and others
 * Mostly uses bucketed tables
 */
public class TestTxnCommands extends TxnCommandsBaseForTests {
  static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
      File.separator + TestTxnCommands.class.getCanonicalName() + "-" + System.currentTimeMillis()
  ).getPath().replaceAll("\\\\", "/");

  @Rule
  public ExpectedException expectedException = ExpectedException.none();
  @Override
  protected String getTestDataDir() {
    return TEST_DATA_DIR;
  }

  @Override
  void initHiveConf() {
    super.initHiveConf();
    //TestTxnCommandsWithSplitUpdateAndVectorization has the vectorized version
    //of these tests.
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
    HiveConf.setVar(hiveConf, HiveConf.ConfVars.DYNAMIC_PARTITIONING_MODE, "nonstrict");
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, false);
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY, false);
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, false);
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, false);
    
    MetastoreConf.setClass(hiveConf, MetastoreConf.ConfVars.FILTER_HOOK,
      DummyMetaStoreFilterHookImpl.class, MetaStoreFilterHook.class);

    HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL, 
      new Path(getWarehouseDir(), "ext").toUri().getPath());
  }

  public static class DummyMetaStoreFilterHookImpl extends DefaultMetaStoreFilterHookImpl {
    private static boolean blockResults = false;
    
    public DummyMetaStoreFilterHookImpl(Configuration conf) {
      super(conf);
    }
    @Override
    public List<String> filterTableNames(String catName, String dbName, List<String> tableList) {
      if (blockResults) {
        return new ArrayList<>();
      }
      return tableList;
    }
  }

  /**
   * tests that a failing Insert Overwrite (which creates a new base_x) is properly marked as
   * aborted.
   */
  @Test
  public void testInsertOverwrite() throws Exception {
    runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2);
    runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
    runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2)");
    List<String> rs = runStatementOnDriver("select a from " + Table.ACIDTBL + " where b = 2");
    Assert.assertEquals(1, rs.size());
    Assert.assertEquals("1", rs.get(0));
    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, true);
    runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " values(3,2)");
    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, false);
    runStatementOnDriver("insert into " + Table.ACIDTBL + " values(5,6)");
    rs = runStatementOnDriver("select a from " + Table.ACIDTBL + " order by a");
    Assert.assertEquals(2, rs.size());
    Assert.assertEquals("1", rs.get(0));
    Assert.assertEquals("5", rs.get(1));
  }

  @Ignore("not needed but useful for testing")
  @Test
  public void testNonAcidInsert() throws Exception {
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)");
    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
  }

  /**
   * Useful for debugging.  Dumps ORC file in JSON to CWD.
   */
  private void dumpBucketData(Table table, long writeId, int stmtId, int bucketNum) throws Exception {
    if(true) {
      return;
    }
    Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(writeId, writeId, stmtId)), bucketNum);
    FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" +  bucket.getName());
//    try {
//      FileDump.printJsonData(conf, bucket.toString(), delta);
//    }
//    catch(FileNotFoundException ex) {
//      ; //this happens if you change BUCKET_COUNT
//    }
    delta.close();
  }
  /**
   * Dump all data in the table by bucket in JSON format
   */
  private void dumpTableData(Table table, long writeId, int stmtId) throws Exception {
    for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) {
      dumpBucketData(table, writeId, stmtId, bucketNum);
    }
  }
  @Test
  public void testSimpleAcidInsert() throws Exception {
    int[][] rows1 = {{1,2},{3,4}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
    //List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    //Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs);
    runStatementOnDriver("START TRANSACTION");
    int[][] rows2 = {{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
    List<String> allData = stringifyValues(rows1);
    allData.addAll(stringifyValues(rows2));
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs0);
    runStatementOnDriver("COMMIT WORK");
    dumpTableData(Table.ACIDTBL, 1, 0);
    dumpTableData(Table.ACIDTBL, 2, 0);
    runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    CommandProcessorException e = runStatementOnDriverNegative("COMMIT"); //txn started implicitly by previous statement
    Assert.assertEquals("Error didn't match: " + e,
        ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), e.getErrorCode());
    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
  }

  @Test
  public void testMmExim() throws Exception {
    String tableName = "mm_table", importName = tableName + "_import";
    runStatementOnDriver("drop table if exists " + tableName);
    runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
        "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
        tableName));

    // Regular insert: export some MM deltas, then import into a new table.
    int[][] rows1 = {{1,2},{3,4}};
    runStatementOnDriver(String.format("insert into %s (a,b) %s",
        tableName, makeValuesClause(rows1)));
    runStatementOnDriver(String.format("insert into %s (a,b) %s",
        tableName, makeValuesClause(rows1)));
    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
    org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName);
    FileSystem fs = FileSystem.get(hiveConf);
    Path exportPath = new Path(table.getSd().getLocation() + "_export");
    fs.delete(exportPath, true);
    runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
    List<String> paths = listPathsRecursive(fs, exportPath);
    verifyMmExportPaths(paths, 2);
    runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
    org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName);
    Assert.assertEquals(imported.toString(), "insert_only",
        imported.getParameters().get("transactional_properties"));
    Path importPath = new Path(imported.getSd().getLocation());
    FileStatus[] stat = fs.listStatus(importPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
    Assert.assertEquals(Arrays.toString(stat), 1, stat.length);
    assertIsDelta(stat[0]);
    List<String> allData = stringifyValues(rows1);
    allData.addAll(stringifyValues(rows1));
    allData.sort(null);
    Collections.sort(allData);
    List<String> rs = runStatementOnDriver(
        String.format("select a,b from %s order by a,b", importName));
    Assert.assertEquals("After import: " + rs, allData, rs);
    runStatementOnDriver("drop table if exists " + importName);

    // Do insert overwrite to create some invalid deltas, and import into a non-MM table.
    int[][] rows2 = {{5,6},{7,8}};
    runStatementOnDriver(String.format("insert overwrite table %s %s",
        tableName, makeValuesClause(rows2)));
    fs.delete(exportPath, true);
    runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
    paths = listPathsRecursive(fs, exportPath);
    verifyMmExportPaths(paths, 1);
    runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
        "TBLPROPERTIES ('transactional'='false')", importName));
    runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
    imported = msClient.getTable("default", importName);
    Assert.assertNull(imported.toString(), imported.getParameters().get("transactional"));
    Assert.assertNull(imported.toString(),
        imported.getParameters().get("transactional_properties"));
    importPath = new Path(imported.getSd().getLocation());
    stat = fs.listStatus(importPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
    allData = stringifyValues(rows2);
    Collections.sort(allData);
    rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName));
    Assert.assertEquals("After import: " + rs, allData, rs);
    runStatementOnDriver("drop table if exists " + importName);
    runStatementOnDriver("drop table if exists " + tableName);
    msClient.close();
  }

  private static final class QueryRunnable implements Runnable {
    private final CountDownLatch cdlIn, cdlOut;
    private final String query;
    private final HiveConf hiveConf;

    QueryRunnable(HiveConf hiveConf, String query, CountDownLatch cdlIn, CountDownLatch cdlOut) {
      this.query = query;
      this.cdlIn = cdlIn;
      this.cdlOut = cdlOut;
      this.hiveConf = new HiveConf(hiveConf);
    }

    @Override
    public void run() {
      SessionState ss = SessionState.start(hiveConf);
      try {
        ss.applyAuthorizationPolicy();
      } catch (HiveException e) {
        throw new RuntimeException(e);
      }
      QueryState qs = new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build();
      try (Driver d = new Driver(qs)) {
        LOG.info("Ready to run the query: " + query);
        syncThreadStart(cdlIn, cdlOut);
        try {
          try {
            d.run(query);
          } catch (CommandProcessorException e) {
            throw new RuntimeException(query + " failed: " + e);
          }
          d.getResults(new ArrayList<String>());
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      }
    }
  }


  private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
    cdlIn.countDown();
    try {
      cdlOut.await();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  @Test
  public void testParallelInsertStats() throws Exception {
    final int TASK_COUNT = 4;
    String tableName = "mm_table";
    List<ColumnStatisticsObj> stats;
    IMetaStoreClient msClient = prepareParallelTest(tableName, 0);

    String[] queries = new String[TASK_COUNT];
    for (int i = 0; i < queries.length; ++i) {
      queries[i] = String.format("insert into %s (a) values (" + i + ")", tableName);
    }

    runParallelQueries(queries);

    // Verify stats are either invalid, or valid and correct.
    stats = getTxnTableStats(msClient, tableName);
    boolean hasStats = 0 != stats.size();
    if (hasStats) {
      verifyLongStats(TASK_COUNT, 0, TASK_COUNT - 1, stats);
    }

    runStatementOnDriver(String.format("insert into %s (a) values (" + TASK_COUNT + ")", tableName));
    if (!hasStats) {
      // Stats should still be invalid if they were invalid.
      stats = getTxnTableStats(msClient, tableName);
      Assert.assertEquals(0, stats.size());
    }

    // Stats should be valid after analyze.
    runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
    verifyLongStats(TASK_COUNT + 1, 0, TASK_COUNT, getTxnTableStats(msClient, tableName));
  }

  private void verifyLongStats(int dvCount, int min, int max, List<ColumnStatisticsObj> stats) {
    Assert.assertEquals(1, stats.size());
    LongColumnStatsData data = stats.get(0).getStatsData().getLongStats();
    Assert.assertEquals(min, data.getLowValue());
    Assert.assertEquals(max, data.getHighValue());
    Assert.assertEquals(dvCount, data.getNumDVs());
  }

  private void runParallelQueries(String[] queries)
      throws InterruptedException, ExecutionException {
    ExecutorService executor = Executors.newFixedThreadPool(queries.length);
    final CountDownLatch cdlIn = new CountDownLatch(queries.length), cdlOut = new CountDownLatch(1);
    Future<?>[] tasks = new Future[queries.length];
    for (int i = 0; i < tasks.length; ++i) {
      tasks[i] = executor.submit(new QueryRunnable(hiveConf, queries[i], cdlIn, cdlOut));
    }
    cdlIn.await(); // Wait for all threads to be ready.
    cdlOut.countDown(); // Release them at the same time.
    for (int i = 0; i < tasks.length; ++i) {
      tasks[i].get();
    }
  }

  private IMetaStoreClient prepareParallelTest(String tableName, int val)
      throws Exception, MetaException, TException, NoSuchObjectException {
    hiveConf.setBoolean("hive.stats.autogather", true);
    hiveConf.setBoolean("hive.stats.column.autogather", true);
    // Need to close the thread local Hive object so that configuration change is reflected to HMS.
    Hive.closeCurrent();
    runStatementOnDriver("drop table if exists " + tableName);
    runStatementOnDriver(String.format("create table %s (a int) stored as orc " +
        "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
        tableName));
    runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
    runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
    // Stats should be valid after serial inserts.
    List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName);
    Assert.assertEquals(1, stats.size());
    return msClient;
  }

  @Test
  public void testAddAndDropConstraintAdvancingWriteIds() throws Exception {

    String tableName = "constraints_table";
    hiveConf.setBoolean("hive.stats.autogather", true);
    hiveConf.setBoolean("hive.stats.column.autogather", true);
    // Need to close the thread local Hive object so that configuration change is reflected to HMS.
    Hive.closeCurrent();
    runStatementOnDriver("drop table if exists " + tableName);
    runStatementOnDriver(String.format("create table %s (a int, b string) stored as orc " +
        "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
        tableName));
    runStatementOnDriver(String.format("insert into %s (a) values (0)", tableName));
    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
    String validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    LOG.info("ValidWriteIds before add constraint::"+ validWriteIds);
    Assert.assertEquals("default.constraints_table:1:9223372036854775807::", validWriteIds);
    runStatementOnDriver(String.format("alter table %s  ADD CONSTRAINT a_PK PRIMARY KEY (`a`) DISABLE NOVALIDATE", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    LOG.info("ValidWriteIds after add constraint primary key::"+ validWriteIds);
    Assert.assertEquals("default.constraints_table:2:9223372036854775807::", validWriteIds);
    runStatementOnDriver(String.format("alter table %s CHANGE COLUMN b b STRING NOT NULL", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    LOG.info("ValidWriteIds after add constraint not null::"+ validWriteIds);
    Assert.assertEquals("default.constraints_table:3:9223372036854775807::", validWriteIds);
    runStatementOnDriver(String.format("alter table %s ADD CONSTRAINT check1 CHECK (a <= 25)", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    LOG.info("ValidWriteIds after add constraint check::"+ validWriteIds);
    Assert.assertEquals("default.constraints_table:4:9223372036854775807::", validWriteIds);
    runStatementOnDriver(String.format("alter table %s ADD CONSTRAINT unique1 UNIQUE (a, b) DISABLE", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    LOG.info("ValidWriteIds after add constraint unique::"+ validWriteIds);
    Assert.assertEquals("default.constraints_table:5:9223372036854775807::", validWriteIds);

    LOG.info("ValidWriteIds before drop constraint::"+ validWriteIds);
    runStatementOnDriver(String.format("alter table %s  DROP CONSTRAINT a_PK", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.constraints_table:6:9223372036854775807::", validWriteIds);
    LOG.info("ValidWriteIds after drop constraint primary key::"+ validWriteIds);
    runStatementOnDriver(String.format("alter table %s  DROP CONSTRAINT check1", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.constraints_table:7:9223372036854775807::", validWriteIds);
    LOG.info("ValidWriteIds after drop constraint check::"+ validWriteIds);
    runStatementOnDriver(String.format("alter table %s  DROP CONSTRAINT unique1", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.constraints_table:8:9223372036854775807::", validWriteIds);
    LOG.info("ValidWriteIds after drop constraint unique::"+ validWriteIds);
    runStatementOnDriver(String.format("alter table %s CHANGE COLUMN b b STRING", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.constraints_table:9:9223372036854775807::", validWriteIds);

  }

  /**
   * If you are disabling or removing this test case, it probably means now we support exchange partition for
   * transactional tables. If that is the case, we also have to make sure we advance the Write IDs during exchange
   * partition DDL for transactional tables. You can look at https://github.com/apache/hive/pull/2465 as an example.
   * @throws Exception
   */
  @Test
  public void exchangePartitionShouldNotWorkForTransactionalTables() throws Exception {
    runStatementOnDriver("create database IF NOT EXISTS db1");
    runStatementOnDriver("create database IF NOT EXISTS db2");

    runStatementOnDriver("CREATE TABLE db1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)");

    String tableName = "db2.exchange_part_test2";
    runStatementOnDriver(String.format("CREATE TABLE %s (f1 string) PARTITIONED BY (ds STRING) " +
    "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"
    ,tableName));

    runStatementOnDriver("ALTER TABLE db2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");

    try {
      runStatementOnDriver("ALTER TABLE db1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') " +
              "WITH TABLE db2.exchange_part_test2");
      Assert.fail("Exchange partition should not be allowed for transaction tables" );
    }catch(Exception e) {
      Assert.assertTrue(e.getMessage().contains("Exchange partition is not allowed with transactional tables"));
    }
  }

  @Test
  public void truncateTableAdvancingWriteId() throws Exception {
    runStatementOnDriver("create database IF NOT EXISTS trunc_db");

    String tableName = "trunc_db.trunc_table";
    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);

    runStatementOnDriver(String.format("CREATE TABLE %s (f1 string) PARTITIONED BY (ds STRING) " +
                    "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"
            , tableName));

    String validWriteIds = msClient.getValidWriteIds(tableName).toString();
    LOG.info("ValidWriteIds before truncate table::" + validWriteIds);
    Assert.assertEquals("trunc_db.trunc_table:0:9223372036854775807::", validWriteIds);

    runStatementOnDriver("TRUNCATE TABLE trunc_db.trunc_table");
    validWriteIds = msClient.getValidWriteIds(tableName).toString();
    LOG.info("ValidWriteIds after truncate table::" + validWriteIds);
    Assert.assertEquals("trunc_db.trunc_table:1:9223372036854775807::", validWriteIds);

  }

  @Test
  public void testAddAndDropPartitionAdvancingWriteIds() throws Exception {
    runStatementOnDriver("create database IF NOT EXISTS db1");

    String tableName = "db1.add_drop_partition";
    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);

    runStatementOnDriver(String.format("CREATE TABLE %s (f1 string) PARTITIONED BY (ds STRING) " +
    "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"
    ,tableName));

    String validWriteIds = msClient.getValidWriteIds(tableName).toString();
    LOG.info("ValidWriteIds before add partition::"+ validWriteIds);
    Assert.assertEquals("db1.add_drop_partition:0:9223372036854775807::", validWriteIds);
    validWriteIds = msClient.getValidWriteIds(tableName).toString();
    runStatementOnDriver("ALTER TABLE db1.add_drop_partition ADD PARTITION (ds='2013-04-05')");
    validWriteIds = msClient.getValidWriteIds(tableName).toString();
    LOG.info("ValidWriteIds after add partition::"+ validWriteIds);
    Assert.assertEquals("db1.add_drop_partition:1:9223372036854775807::", validWriteIds);
    runStatementOnDriver("ALTER TABLE db1.add_drop_partition DROP PARTITION (ds='2013-04-05')");
    validWriteIds = msClient.getValidWriteIds(tableName).toString();
    LOG.info("ValidWriteIds after drop partition::"+ validWriteIds);
    Assert.assertEquals("db1.add_drop_partition:2:9223372036854775807::", validWriteIds);

  }

  @Test
  public void testDDLsAdvancingWriteIds() throws Exception {
    hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true);

    String tableName = "alter_table";
    runStatementOnDriver("drop table if exists " + tableName);
    runStatementOnDriver(String.format("create table %s (a int, b string, c BIGINT, d INT) " +
        "PARTITIONED BY (ds STRING)" +
        "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
        tableName));
    runStatementOnDriver(String.format("insert into %s (a) values (0)", tableName));
    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
    String validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:1:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("alter table %s SET OWNER USER user_name", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:2:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("alter table %s CLUSTERED BY(c) SORTED BY(d) INTO 32 BUCKETS", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:3:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("ALTER TABLE %s ADD PARTITION (ds='2013-04-05')", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:4:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("ALTER TABLE %s SET SERDEPROPERTIES ('field.delim'='\\u0001')", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:5:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("ALTER TABLE %s PARTITION (ds='2013-04-05') SET FILEFORMAT PARQUET", tableName));
    validWriteIds  = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:6:9223372036854775807::", validWriteIds);

    // We should not advance the Write ID during compaction, since it affects the performance of
    // materialized views. So, below assertion ensures that we do not advance the write during compaction.
    runStatementOnDriver(String.format("ALTER TABLE %s PARTITION (ds='2013-04-05') COMPACT 'minor'", tableName));
    validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:6:9223372036854775807::", validWriteIds);

    //Process the compaction request because otherwise the CONCATENATE (major compaction) command on the same table and
    // partition would be refused.
    runWorker(hiveConf);
    runCleaner(hiveConf);

    runStatementOnDriver(String.format("ALTER TABLE %s PARTITION (ds='2013-04-05') CONCATENATE", tableName));
    validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:7:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("ALTER TABLE %s SKEWED BY (a) ON (1,2)", tableName));
    validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:8:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("ALTER TABLE %s SET SKEWED LOCATION (1='hdfs://127.0.0.1:8020/abcd/1')",
      tableName));
    validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:9:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("ALTER TABLE %s NOT SKEWED", tableName));
    validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:10:9223372036854775807::", validWriteIds);

    runStatementOnDriver(String.format("ALTER TABLE %s UNSET SERDEPROPERTIES ('field.delim')", tableName));
    validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    Assert.assertEquals("default.alter_table:11:9223372036854775807::", validWriteIds);

  }

  @Test
  public void testParallelInsertAnalyzeStats() throws Exception {
    String tableName = "mm_table";
    List<ColumnStatisticsObj> stats;
    IMetaStoreClient msClient = prepareParallelTest(tableName, 0);

    String[] queries = {
        String.format("insert into %s (a) values (999)", tableName),
        String.format("analyze table %s compute statistics for columns", tableName)
    };
    runParallelQueries(queries);

    // Verify stats are either invalid, or valid and correct.
    stats = getTxnTableStats(msClient, tableName);
    boolean hasStats = 0 != stats.size();
    if (hasStats) {
      verifyLongStats(2, 0, 999, stats);
    }

    runStatementOnDriver(String.format("insert into %s (a) values (1000)", tableName));
    if (!hasStats) {
      // Stats should still be invalid if they were invalid.
      stats = getTxnTableStats(msClient, tableName);
      Assert.assertEquals(0, stats.size());
    }

    // Stats should be valid after analyze.
    runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
    verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName));
  }

  @Test
  public void testParallelTruncateAnalyzeStats() throws Exception {
    String tableName = "mm_table";
    List<ColumnStatisticsObj> stats;
    IMetaStoreClient msClient = prepareParallelTest(tableName, 0);

    String[] queries = {
        String.format("truncate table %s", tableName),
        String.format("analyze table %s compute statistics for columns", tableName)
    };
    runParallelQueries(queries);

    // Verify stats are either invalid, or valid and correct.
    stats = getTxnTableStats(msClient, tableName);
    boolean hasStats = 0 != stats.size();
    if (hasStats) {
      // Either the truncate run before or the analyze
      if (stats.get(0).getStatsData().getLongStats().getNumDVs() > 0) {
        verifyLongStats(1, 0, 0, stats);
      } else {
        verifyLongStats(0, 0, 0, stats);
      }
    }

    // Stats should be valid after analyze.
    runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
    verifyLongStats(0, 0, 0, getTxnTableStats(msClient, tableName));
  }


  @Test
  public void testTxnStatsOnOff() throws Exception {
    String tableName = "mm_table";
    hiveConf.setBoolean("hive.stats.autogather", true);
    hiveConf.setBoolean("hive.stats.column.autogather", true);
    // Need to close the thread local Hive object so that configuration change is reflected to HMS.
    Hive.closeCurrent();
    runStatementOnDriver("drop table if exists " + tableName);
    runStatementOnDriver(String.format("create table %s (a int) stored as orc " +
        "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
        tableName));

    runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName));
    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
    List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName);
    Assert.assertEquals(1, stats.size());
    runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName));
    stats = getTxnTableStats(msClient, tableName);
    Assert.assertEquals(1, stats.size());
    msClient.close();
    hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false);
    msClient = new HiveMetaStoreClient(hiveConf);
    // Even though the stats are valid in metastore, txn stats are disabled.
    stats = getTxnTableStats(msClient, tableName);
    Assert.assertEquals(0, stats.size());
    msClient.close();
    hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true);
    msClient = new HiveMetaStoreClient(hiveConf);
    stats = getTxnTableStats(msClient, tableName);
    // Now the stats are visible again.
    Assert.assertEquals(1, stats.size());
    msClient.close();
    hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false);
    // Need to close the thread local Hive object so that configuration change is reflected to HMS.
    Hive.closeCurrent();
    // Running the query with stats disabled will cause stats in metastore itself to become invalid.
    runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName));
    hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true);
    msClient = new HiveMetaStoreClient(hiveConf);
    stats = getTxnTableStats(msClient, tableName);
    Assert.assertEquals(0, stats.size());
    msClient.close();
  }

  public List<ColumnStatisticsObj> getTxnTableStats(IMetaStoreClient msClient,
      String tableName) throws TException, NoSuchObjectException, MetaException {
    String validWriteIds;
    List<ColumnStatisticsObj> stats;
    validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
    stats = msClient.getTableColumnStatistics(
        "default", tableName, Lists.newArrayList("a"), Constants.HIVE_ENGINE, validWriteIds);
    return stats;
  }

  private void assertIsDelta(FileStatus stat) {
    Assert.assertTrue(stat.toString(),
        stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
  }

  private void verifyMmExportPaths(List<String> paths, int deltasOrBases) {
    // 1 file, 1 dir for each, for now. Plus export "data" dir.
    // This could be changed to a flat file list later.
    Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size());
    // No confusing directories in export.
    for (String path : paths) {
      Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX));
      Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX));
    }
  }

  private List<String> listPathsRecursive(FileSystem fs, Path path) throws IOException {
    List<String> paths = new ArrayList<>();
    LinkedList<Path> queue = new LinkedList<>();
    queue.add(path);
    while (!queue.isEmpty()) {
      Path next = queue.pollFirst();
      FileStatus[] stats = fs.listStatus(next, FileUtils.HIDDEN_FILES_PATH_FILTER);
      for (FileStatus stat : stats) {
        Path child = stat.getPath();
        paths.add(child.toString());
        if (stat.isDirectory()) {
          queue.add(child);
        }
      }
    }
    return paths;
  }


  /**
   * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
   * @throws Exception
   */
  @Test
  public void testErrors() throws Exception {
    runStatementOnDriver("start transaction");
    CommandProcessorException e1 = runStatementOnDriverNegative("create table foo(x int, y int)");
    Assert.assertEquals("Expected DDL to fail in an open txn",
        ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), e1.getErrorCode());
    CommandProcessorException e2 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1");
    Assert.assertEquals("Expected update of bucket column to fail",
        "FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported.  Column a.",
        e2.getMessage());
    Assert.assertEquals("Expected update of bucket column to fail",
        ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), e2.getErrorCode());
    CommandProcessorException e3 = runStatementOnDriverNegative("commit"); //not allowed in w/o tx
    Assert.assertEquals("Error didn't match: " + e3,
        ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), e3.getErrorCode());
    CommandProcessorException e4 = runStatementOnDriverNegative("rollback"); //not allowed in w/o tx
    Assert.assertEquals("Error didn't match: " + e4,
        ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), e4.getErrorCode());
    runStatementOnDriver("start transaction");
    CommandProcessorException e5 = runStatementOnDriverNegative("start transaction"); //not allowed in a tx
    Assert.assertEquals("Expected start transaction to fail",
        ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), e5.getErrorCode());
    runStatementOnDriver("start transaction"); //ok since previously opened txn was killed
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Can't see my own write", 1, rs0.size());
    runStatementOnDriver("commit work");
    rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Can't see my own write", 1, rs0.size());
  }

  @Test
  public void testReadMyOwnInsert() throws Exception {
    runStatementOnDriver("START TRANSACTION");
    List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
    Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size());
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Can't see my own write", 1, rs0.size());
    runStatementOnDriver("commit");
    runStatementOnDriver("START TRANSACTION");
    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    runStatementOnDriver("rollback work");
    Assert.assertEquals("Can't see write after commit", 1, rs1.size());
  }
  @Test
  public void testImplicitRollback() throws Exception {
    runStatementOnDriver("START TRANSACTION");
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Can't see my own write", 1, rs0.size());
    //next command should produce an error
    CommandProcessorException e = runStatementOnDriverNegative("select * from no_such_table");
    Assert.assertEquals("Txn didn't fail?",
        "FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'no_such_table'",
        e.getMessage());
    runStatementOnDriver("start transaction");
    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    runStatementOnDriver("commit");
    Assert.assertEquals("Didn't rollback as expected", 0, rs1.size());
  }
  @Test
  public void testExplicitRollback() throws Exception {
    runStatementOnDriver("START TRANSACTION");
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
    runStatementOnDriver("ROLLBACK");
    List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Rollback didn't rollback", 0, rs.size());
  }

  @Test
  public void testMultipleInserts() throws Exception {
    runStatementOnDriver("START TRANSACTION");
    int[][] rows1 = {{1,2},{3,4}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
    int[][] rows2 = {{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
    List<String> allData = stringifyValues(rows1);
    allData.addAll(stringifyValues(rows2));
    List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Content didn't match before commit rs", allData, rs);
    runStatementOnDriver("commit");
    dumpTableData(Table.ACIDTBL, 1, 0);
    dumpTableData(Table.ACIDTBL, 1, 1);
    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Content didn't match after commit rs1", allData, rs1);
  }

  @Test
  public void testDeleteOfMultipleInserts() throws Exception {
    runStatementOnDriver("START TRANSACTION");
    int[][] rows1 = {{1,2},{3,4}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
    int[][] rows2 = {{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
    runStatementOnDriver("commit");
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 2");
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8");
    List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    int[][] remain = {{3,4},{5,6}};
    Assert.assertEquals("Content didn't match after delete ", stringifyValues(remain), rs2);
  }

  @Test
  public void testDelete() throws Exception {
    int[][] rows1 = {{1,2},{3,4}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
    runStatementOnDriver("START TRANSACTION");
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
    int[][] updatedData2 = {{1,2}};
    List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
    runStatementOnDriver("commit");
    List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
  }

  @Test
  public void testUpdateOfInserts() throws Exception {
    int[][] rows1 = {{1,2},{3,4}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
    runStatementOnDriver("START TRANSACTION");
    int[][] rows2 = {{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    List<String> allData = stringifyValues(rows1);
    allData.addAll(stringifyValues(rows2));
    Assert.assertEquals("Content didn't match rs1", allData, rs1);
    runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
    int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
    List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
    runStatementOnDriver("commit");
    List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4);
  }
  @Test
  public void testUpdateDeleteOfInserts() throws Exception {
    int[][] rows1 = {{1,2},{3,4}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
    runStatementOnDriver("START TRANSACTION");
    int[][] rows2 = {{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    List<String> allData = stringifyValues(rows1);
    allData.addAll(stringifyValues(rows2));
    Assert.assertEquals("Content didn't match rs1", allData, rs1);
    runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
    int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
    List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7 and b = 1");
    dumpTableData(Table.ACIDTBL, 1, 0);
    dumpTableData(Table.ACIDTBL, 2, 0);
    dumpTableData(Table.ACIDTBL, 2, 2);
    dumpTableData(Table.ACIDTBL, 2, 4);
    int[][] updatedData2 = {{1,1},{3,1},{5,1}};
    List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
    runStatementOnDriver("commit");
    List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
  }
  @Test
  public void testMultipleDelete() throws Exception {
    int[][] rows1 = {{1,2},{3,4},{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
    List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
    runStatementOnDriver("START TRANSACTION");
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8");
    int[][] updatedData2 = {{1,2},{3,4},{5,6}};
    List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs2);
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
    int[][] updatedData3 = {{1, 2}, {5, 6}};
    List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after delete2", stringifyValues(updatedData3), rs3);
    runStatementOnDriver("update " + Table.ACIDTBL + " set b=3");
    dumpTableData(Table.ACIDTBL, 1, 0);
    //nothing actually hashes to bucket0, so update/delete deltas don't have it
    dumpTableData(Table.ACIDTBL, 2, 0);
    dumpTableData(Table.ACIDTBL, 2, 2);
    dumpTableData(Table.ACIDTBL, 2, 4);
    List<String> rs5 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    int [][] updatedData4 = {{1,3},{5,3}};
    Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5);
    runStatementOnDriver("commit");
    List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4);
  }
  @Test
  public void testDeleteIn() throws Exception {
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where a IN (SELECT A.a from " +
        Table.ACIDTBL + "  A)");
    int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
    runStatementOnDriver("insert into " + Table.ACIDTBL2 + "(a,b,c) values(1,7,17),(3,7,17)");
//    runStatementOnDriver("select b from " + Table.ACIDTBL + " where a in (select b from " + Table.NONACIDORCTBL + ")");
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.ACIDTBL2 + ")");
//    runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")");
    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBL2);
    List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}};
    Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs);
  }
  @Test
  public void testTimeOutReaper() throws Exception {
    runStatementOnDriver("start transaction");
    runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5");
    //make sure currently running txn is considered aborted by housekeeper
    hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS);
    MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
    houseKeeperService.setConf(hiveConf);
    //this will abort the txn
    houseKeeperService.run();
    //this should fail because txn aborted due to timeout
    CommandProcessorException e = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5");
    Assert.assertTrue("Actual: " + e.getMessage(),
        e.getMessage().contains("Transaction manager has aborted the transaction txnid:5"));

    //now test that we don't timeout locks we should not
    //heartbeater should be running in the background every 1/2 second
    hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
    // Have to reset the conf when we change it so that the change takes affect
    houseKeeperService.setConf(hiveConf);
    runStatementOnDriver("start transaction");
    runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
    pause(750);

    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);

    //since there is txn open, we are heartbeating the txn not individual locks
    GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
    Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
    TxnInfo txnInfo = null;
    for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) {
      if(ti.getState() == TxnState.OPEN) {
        txnInfo = ti;
        break;
      }
    }
    Assert.assertNotNull(txnInfo);
    Assert.assertEquals(6, txnInfo.getId());
    Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
    String s = TestTxnDbUtil
        .queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
    String[] vals = s.split("\\s+");
    Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
    long lastHeartbeat = Long.parseLong(vals[1]);
    //these 2 values are equal when TXN entry is made.  Should never be equal after 1st heartbeat, which we
    //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
    Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);

    ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
    TestTxnDbUtil.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
    pause(750);
    houseKeeperService.run();
    pause(750);
    slr = txnHandler.showLocks(new ShowLocksRequest());
    Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
    TestTxnDbUtil.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());

    pause(750);
    houseKeeperService.run();
    slr = txnHandler.showLocks(new ShowLocksRequest());
    Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
    TestTxnDbUtil.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());

    //should've done several heartbeats
    s = TestTxnDbUtil.queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
    vals = s.split("\\s+");
    Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
    Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")",
        lastHeartbeat < Long.parseLong(vals[1]));

    runStatementOnDriver("rollback");
    slr = txnHandler.showLocks(new ShowLocksRequest());
    Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size());
  }
  private static void pause(int timeMillis) {
    try {
      Thread.sleep(timeMillis);
    }
    catch (InterruptedException e) {
    }
  }

  @Test
  public void exchangePartition() throws Exception {
    runStatementOnDriver("create database ex1");
    runStatementOnDriver("create database ex2");

    runStatementOnDriver("CREATE TABLE ex1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)");
    runStatementOnDriver("CREATE TABLE ex2.exchange_part_test2 (f1 string) PARTITIONED BY (ds STRING)");
    runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");
    runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2");
  }
  @Test
  public void testMergeNegative() throws Exception {
    CommandProcessorException e = runStatementOnDriverNegative(
        "MERGE INTO " + Table.ACIDTBL + " target\n" +
        "USING " + Table.NONACIDORCTBL + " source ON target.a = source.a\n" +
        "WHEN MATCHED THEN UPDATE set b = 1\n" +
        "WHEN MATCHED THEN DELETE\n" +
        "WHEN NOT MATCHED AND a < 1 THEN INSERT VALUES(1,2)");
    Assert.assertEquals(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ((HiveException)e.getCause()).getCanonicalErrorMsg());
  }
  @Test
  public void testMergeNegative2() throws Exception {
    CommandProcessorException e = runStatementOnDriverNegative(
        "MERGE INTO "+ Table.ACIDTBL +
        " target USING " + Table.NONACIDORCTBL + "\n source ON target.pk = source.pk " +
        "\nWHEN MATCHED THEN UPDATE set b = 1 " +
        "\nWHEN MATCHED THEN UPDATE set b=a");
    Assert.assertEquals(ErrorMsg.MERGE_TOO_MANY_UPDATE, ((HiveException)e.getCause()).getCanonicalErrorMsg());
  }

  /**
   * `1` means 1 is a column name and '1' means 1 is a string literal
   * HiveConf.HIVE_QUOTEDID_SUPPORT
   * HiveConf.HIVE_SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES
   * {@link TestTxnCommands#testMergeType2SCD01()}
   */
  @Test
  public void testQuotedIdentifier() throws Exception {
    String target = "`aci/d_u/ami`";
    String src = "`src/name`";
    runStatementOnDriver("drop table if exists " + target);
    runStatementOnDriver("drop table if exists " + src);
    runStatementOnDriver("create table " + target + "(i int," +
        "`d?*de e` decimal(5,2)," +
        "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("create table " + src + "(gh int, j decimal(5,2), k varchar(128))");
    runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
        "\nwhen matched and i > 5 then delete " +
        "\nwhen matched then update set vc='blah' " +
        "\nwhen not matched then insert values(1,2.1,'baz')");
    runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
        "\nwhen matched and i > 5 then delete " +
        "\nwhen matched then update set vc='blah',  `d?*de e` = current_timestamp()  " +
        "\nwhen not matched then insert values(1,2.1, concat('baz', current_timestamp()))");
    runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
        "\nwhen matched and i > 5 then delete " +
        "\nwhen matched then update set vc='blah' " +
        "\nwhen not matched then insert values(1,2.1,'a\\b')");
    runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
        "\nwhen matched and i > 5 then delete " +
        "\nwhen matched then update set vc='∆∋'" +
        "\nwhen not matched then insert values(`a/b`.gh,`a/b`.j,'c\\t')");
  }
  @Test
  public void testQuotedIdentifier2() throws Exception {
    String target = "`aci/d_u/ami`";
    String src = "`src/name`";
    runStatementOnDriver("drop table if exists " + target);
    runStatementOnDriver("drop table if exists " + src);
    runStatementOnDriver("create table " + target + "(i int," +
        "`d?*de e` decimal(5,2)," +
        "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))");
    runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" +
        "\nwhen matched and `g/h` > 5 then delete " +
        "\nwhen matched and `g/h` < 0 then update set vc='∆∋', `d?*de e` =  `d?*de e` * j + 1" +
        "\nwhen not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)");
    runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" +
        "\nwhen matched and `g/h` > 5 then delete" +
        "\n when matched and `g/h` < 0 then update set vc='∆∋'  , `d?*de e` =  `d?*de e` * j + 1  " +
        "\n when not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)");
  }
  /**
   * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
   * also test QuotedIdentifier inside source expression
   * {@link TestTxnCommands#testQuotedIdentifier()}
   * {@link TestTxnCommands#testQuotedIdentifier2()}
   */
  @Test
  public void testMergeType2SCD01() throws Exception {
    runStatementOnDriver("drop table if exists target");
    runStatementOnDriver("drop table if exists source");
    runStatementOnDriver("drop table if exists splitTable");

    runStatementOnDriver("create table splitTable(op int)");
    runStatementOnDriver("insert into splitTable values (0),(1)");
    runStatementOnDriver("create table source (key int, data int)");
    runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
    runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
    int[][] sourceVals = {{1, 7}, {3, 8}};
    runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
    //augment source with a col which has 1 if it will cause an update in target, 0 otherwise
    String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key";
    //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update
    String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end `o/p\\n` from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1";
    if(false) {
      //this is just for debug
      List<String> r1 = runStatementOnDriver(curMatch);
      List<String> r2 = runStatementOnDriver(teeCurMatch);
    }
    String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " +
        "when matched then update set cur=0 " +
        "when not matched then insert values(s.key,s.data,1)";
    //to allow cross join from 'teeCurMatch'
    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
    runStatementOnDriver(stmt);
    int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
    List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
    Assert.assertEquals(stringifyValues(resultVals), r);
  }
  /**
   * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
   * Same as testMergeType2SCD01 but with a more intuitive "source" expression
   */
  @Test
  public void testMergeType2SCD02() throws Exception {
    runStatementOnDriver("drop table if exists target");
    runStatementOnDriver("drop table if exists source");
    runStatementOnDriver("create table source (key int, data int)");
    runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
    runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
    int[][] sourceVals = {{1, 7}, {3, 8}};
    runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));

    String baseSrc =  "select source.*, 0 c from source " +
        "union all " +
        "select source.*, 1 c from source " +
        "inner join target " +
        "on source.key=target.key where target.cur=1";
    if(false) {
      //this is just for debug
      List<String> r1 = runStatementOnDriver(baseSrc);
      List<String> r2 = runStatementOnDriver(
        "select t.*, s.* from target t right outer join (" + baseSrc + ") s " +
          "\non t.key=s.key and t.cur=s.c and t.cur=1");
    }
    String stmt = "merge into target t using " +
        "(" + baseSrc + ") s " +
        "on t.key=s.key and t.cur=s.c and t.cur=1 " +
        "when matched then update set cur=0 " +
        "when not matched then insert values(s.key,s.data,1)";

    runStatementOnDriver(stmt);
    int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
    List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
    Assert.assertEquals(stringifyValues(resultVals), r);
  }

  @Test
  public void testMergeOnTezEdges() throws Exception {
    String query = "merge into " + Table.ACIDTBL +
        " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
        "WHEN MATCHED AND s.a > 8 THEN DELETE " +
        "WHEN MATCHED THEN UPDATE SET b = 7 " +
        "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
    d.destroy();
    HiveConf hc = new HiveConf(hiveConf);
    hc.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
    hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
    d = new Driver(hc);
    d.setMaxRows(10000);

    List<String> explain = runStatementOnDriver("explain " + query);
    StringBuilder sb = new StringBuilder();
    for(String s : explain) {
      sb.append(s).append('\n');
    }
    LOG.info("Explain1: " + sb);
    /*
     Edges:
     Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)
     Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
     Reducer 4 <- Reducer 2 (SIMPLE_EDGE)
     Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
     Reducer 6 <- Reducer 2 (SIMPLE_EDGE)
     Reducer 7 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
     */
    for(int i = 0; i < explain.size(); i++) {
      if(explain.get(i).contains("Edges:")) {
        Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1),
            explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 8 (SIMPLE_EDGE)"));
        Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2),
            explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)"));
        Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3),
            explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)"));
        Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4),
            explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)"));
        Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5),
            explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (SIMPLE_EDGE)"));
        Assert.assertTrue("At i+1=" + (i+6) + explain.get(i + 6),
            explain.get(i + 6).contains("Reducer 7 <- Reducer 2 (SIMPLE_EDGE)"));
        break;
      }
    }
  }
  @Test
  public void testMergeUpdateDelete() throws Exception {
    int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
    String query = "merge into " + Table.ACIDTBL +
        " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
        "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + //updates (2,1) -> (2,0)
        "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +//deletes (4,3)
        "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) "; //inserts (11,11)
    runStatementOnDriver(query);

    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}};
    Assert.assertEquals(stringifyValues(rExpected), r);
  }
  @Test
  public void testMergeUpdateDeleteNoCardCheck() throws Exception {
    d.destroy();
    HiveConf hc = new HiveConf(hiveConf);
    hc.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, false);
    d = new Driver(hc);
    d.setMaxRows(10000);

    int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
    String query = "merge into " + Table.ACIDTBL +
        " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
        "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
        "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE ";
    runStatementOnDriver(query);

    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    int[][] rExpected = {{2,0},{5,6},{7,8}};
    Assert.assertEquals(stringifyValues(rExpected), r);
  }
  @Test
  public void testMergeDeleteUpdate() throws Exception {
    int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}};
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
    int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
    String query = "merge into " + Table.ACIDTBL +
        " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
        "WHEN MATCHED and s.a < 5 THEN DELETE " +
        "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
        "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
    runStatementOnDriver(query);

    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
    int[][] rExpected = {{5,6},{7,8},{11,11}};
    Assert.assertEquals(stringifyValues(rExpected), r);
  }

  /**
   * see https://issues.apache.org/jira/browse/HIVE-14949 for details
   * @throws Exception
   */
  @Test
  public void testMergeCardinalityViolation() throws Exception {
    int[][] sourceVals = {{2,2},{2,44},{5,5},{11,11}};
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
    int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
    String query = "merge into " + Table.ACIDTBL +
        " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
        "WHEN MATCHED and s.a < 5 THEN DELETE " +
        "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
        "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
    runStatementOnDriverNegative(query);
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
    query = "merge into " + Table.ACIDTBLPART +
        " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
        "WHEN MATCHED and s.a < 5 THEN DELETE " +
        "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
        "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b, 'p1') ";
    runStatementOnDriverNegative(query);
  }
  @Test
  public void testSetClauseFakeColumn() throws Exception {
    CommandProcessorException e1 = runStatementOnDriverNegative(
        "MERGE INTO "+ Table.ACIDTBL + " target\n" +
        "USING " + Table.NONACIDORCTBL + "\n" +
        " source ON target.a = source.a\n" +
        "WHEN MATCHED THEN UPDATE set t = 1");
    Assert.assertEquals(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE,
        ((HiveException)e1.getCause()).getCanonicalErrorMsg());

    CommandProcessorException e2 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set t = 1");
    Assert.assertEquals(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE,
        ((HiveException)e2.getCause()).getCanonicalErrorMsg());
  }

  @Test
  public void testBadOnClause() throws Exception {
    CommandProcessorException e =
        runStatementOnDriverNegative(
            "merge into " + Table.ACIDTBL + " trgt\n" +
            "using (select *\n" +
            "       from " + Table.NONACIDORCTBL + " src) sub on sub.a = target.a\n" +
            "when not matched then insert values (sub.a,sub.b)");
    Assert.assertTrue("Error didn't match: " + e, e.getMessage().contains(
        "No columns from target table 'trgt' found in ON clause '`sub`.`a` = `target`.`a`' of MERGE statement."));
  }

  /**
   * Writing UTs that need multiple threads is challenging since Derby chokes on concurrent access.
   * This tests that "AND WAIT" actually blocks and responds to interrupt
   * @throws Exception
   */
  @Test
  public void testCompactionBlocking() throws Exception {
    Timer cancelCompact = new Timer("CancelCompactionTimer", false);
    final Thread threadToInterrupt= Thread.currentThread();
    cancelCompact.schedule(new TimerTask() {
      @Override
      public void run() {
        threadToInterrupt.interrupt();
      }
    }, 5000);
    long start = System.currentTimeMillis();
    runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'major' AND WAIT");
    //no Worker so it stays in initiated state
    //w/o AND WAIT the above alter table retunrs almost immediately, so the test here to check that
    //> 2 seconds pass, i.e. that the command in Driver actually blocks before cancel is fired
    Assert.assertTrue(System.currentTimeMillis() > start + 2);
  }

  @Test
  public void testMergeCase() throws Exception {
    runStatementOnDriver("create table merge_test (c1 integer, c2 integer, c3 integer) CLUSTERED BY (c1) into 2 buckets stored as orc tblproperties(\"transactional\"=\"true\")");
    runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)");
    runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))");
  }
  /**
   * HIVE-16177
   * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()}
   */
  @Test
  public void testNonAcidToAcidConversion01() throws Exception {
    //create 1 row in a file 000001_0 (and an empty 000000_0)
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
    //create 1 row in a file 000000_0_copy1 and 1 row in a file 000001_0_copy1
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)");

    //convert the table to Acid
    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
    //create a delta directory
    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)");

    boolean isVectorized = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
    String query = "select ROW__ID, a, b" + (isVectorized ? " from  " : ", INPUT__FILE__NAME from ") +  Table.NONACIDORCTBL + " order by ROW__ID";
    String[][] expected = new String[][] {
      {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/000001_0"},
      {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/000001_0_copy_1"},
      {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/000001_0_copy_1"},
      {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001_0"}
    };
    checkResult(expected, query, isVectorized, "before compact", LOG);

    Assert.assertEquals(536870912,
        BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
    Assert.assertEquals(536936448,
        BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));

    //run Compaction
    runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " compact 'major'");
    runWorker(hiveConf);

    query = "select ROW__ID, a, b" + (isVectorized ? "" : ", INPUT__FILE__NAME") + " from "
        + Table.NONACIDORCTBL + " order by ROW__ID";
    String[][] expected2 = new String[][] {
        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000010/bucket_00001"},
        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000010/bucket_00001"},
        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000010/bucket_00001"},
        {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000010/bucket_00001"}
    };
    checkResult(expected2, query, isVectorized, "after major compact", LOG);
    //make sure they are the same before and after compaction
  }
  //@Ignore("see bucket_num_reducers_acid.q")
  @Test
  public void testMoreBucketsThanReducers() throws Exception {
    //see bucket_num_reducers.q bucket_num_reducers2.q
    // todo: try using set VerifyNumReducersHook.num.reducers=10;
    d.destroy();
    HiveConf hc = new HiveConf(hiveConf);
    hc.setIntVar(HiveConf.ConfVars.MAX_REDUCERS, 1);
    //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others
    hc.setIntVar(HiveConf.ConfVars.HADOOP_NUM_REDUCERS, 1);
    hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
    d = new Driver(hc);
    d.setMaxRows(10000);
    runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,1)"); //txn X write to bucket1
    runStatementOnDriver("insert into " + Table.ACIDTBL + " values(0,0),(3,3)"); // txn X + 1 write to bucket0 + bucket1
    runStatementOnDriver("update " + Table.ACIDTBL + " set b = -1");
    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBL + " order by a, b");
    int[][] expected = {{0, -1}, {1, -1}, {3, -1}};
    Assert.assertEquals(stringifyValues(expected), r);
  }
  @Ignore("Moved to Tez")
  @Test
  public void testMoreBucketsThanReducers2() throws Exception {
    //todo: try using set VerifyNumReducersHook.num.reducers=10;
    //see bucket_num_reducers.q bucket_num_reducers2.q
    d.destroy();
    HiveConf hc = new HiveConf(hiveConf);
    hc.setIntVar(HiveConf.ConfVars.MAX_REDUCERS, 2);
    //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others
    hc.setIntVar(HiveConf.ConfVars.HADOOP_NUM_REDUCERS, 2);
    d = new Driver(hc);
    d.setMaxRows(10000);
    runStatementOnDriver("create table fourbuckets (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    //below value for a is bucket id, for b - txn id (logically)
    runStatementOnDriver("insert into fourbuckets values(0,1),(1,1)"); //txn X write to b0 + b1
    runStatementOnDriver("insert into fourbuckets values(2,2),(3,2)"); // txn X + 1 write to b2 + b3
    runStatementOnDriver("insert into fourbuckets values(0,3),(1,3)"); //txn X + 2 write to b0 + b1
    runStatementOnDriver("insert into fourbuckets values(2,4),(3,4)"); //txn X + 3 write to b2 + b3
    //so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by ROW__ID where tnxid is the first component
    //FS2 should see (1,1),(3,2),(1,3),(3,4)

    runStatementOnDriver("update fourbuckets set b = -1");
    List<String> r = runStatementOnDriver("select * from fourbuckets order by a, b");
    int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}};
    Assert.assertEquals(stringifyValues(expected), r);
  }
  @Test
  public void testVersioningVersionFileEnabled() throws Exception {
    acidVersionTest(true);
  }

  @Test
  public void testVersioningVersionFileDisabled() throws Exception {
    acidVersionTest(false);
  }

  private void acidVersionTest(boolean enableVersionFile) throws Exception {
    boolean originalEnableVersionFile = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE);
    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, enableVersionFile);

    hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true");
    // Need to close the thread local Hive object so that configuration change is reflected to HMS.
    Hive.closeCurrent();
    runStatementOnDriver("drop table if exists T");
    runStatementOnDriver("create table T (a int, b int) stored as orc");
    int[][] data = {{1, 2}};
    //create 1 delta file bucket_00000
    runStatementOnDriver("insert into T" + makeValuesClause(data));
    runStatementOnDriver("update T set a=3 where b=2");

    FileSystem fs = FileSystem.get(hiveConf);
    RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
    CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX });

    runStatementOnDriver("alter table T compact 'minor'");
    runWorker(hiveConf);

    // Check status of compaction job
    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
    Assert.assertEquals("Unexpected 0 compaction state",
        TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
    Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
    Assert.assertTrue(resp.getCompacts().get(0).getType().equals(CompactionType.MINOR));

    // Check the files after minor compaction
    files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
    CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX });

    runStatementOnDriver("insert into T" + makeValuesClause(data));

    runStatementOnDriver("alter table T compact 'major'");
    runWorker(hiveConf);

    // Check status of compaction job
    txnHandler = TxnUtils.getTxnStore(hiveConf);
    resp = txnHandler.showCompact(new ShowCompactRequest());
    Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
    Assert.assertEquals("Unexpected 1 compaction state",
        TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState());
    Assert.assertTrue(resp.getCompacts().get(1).getHadoopJobId().startsWith("job_local"));

    // Check the files after major compaction
    files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
    CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.BASE_PREFIX });

    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile);
  }

  @Test
  public void testTruncateWithBase() throws Exception{
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, true);
    
    runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2),(3,4)");
    runStatementOnDriver("truncate table " + Table.ACIDTBL);

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBL.toString().toLowerCase()),
        AcidUtils.baseFileFilter);
    if (1 != stat.length) {
      Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
    }
    String name = stat[0].getPath().getName();
    Assert.assertEquals("base_0000002", name);

    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBL);
    Assert.assertEquals(0, r.size());
  }

  @Test
  public void testTruncateWithBaseAllPartition() throws Exception{
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, true);
    
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='a') values(1,2),(3,4)");
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='b') values(1,2),(3,4)");
    runStatementOnDriver("truncate table " + Table.ACIDTBLPART);

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
        AcidUtils.baseFileFilter);
    if (1 != stat.length) {
      Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
    }
    String name = stat[0].getPath().getName();
    Assert.assertEquals("base_0000003", name);

    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
    Assert.assertEquals(0, r.size());
  }

  @Test
  public void testTruncateWithBaseOnePartition() throws Exception{
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, true);
    
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='a') values(1,2),(3,4)");
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='b') values(5,5),(4,4)");
    runStatementOnDriver("truncate table " + Table.ACIDTBLPART + " partition(p='b')");

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
        AcidUtils.baseFileFilter);
    if (1 != stat.length) {
      Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
    }
    String name = stat[0].getPath().getName();
    Assert.assertEquals("base_0000003", name);
    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
        AcidUtils.deltaFileFilter);
    if (1 != stat.length) {
      Assert.fail("Expecting 1 delta and found " + stat.length + " files " + Arrays.toString(stat));
    }

    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
    Assert.assertEquals(2, r.size());
  }

  @Test
  public void testDropWithBaseAndRecreateOnePartition() throws Exception {
    dropWithBaseOnePartition(true);
  }
  @Test
  public void testDropWithBaseOnePartition() throws Exception {
    dropWithBaseOnePartition(false);
  }
  
  private void dropWithBaseOnePartition(boolean reCreate) throws Exception {
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='a') values (1,2),(3,4)");
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (5,5),(4,4)");

    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true);
    runStatementOnDriver("alter table " + Table.ACIDTBLPART + " drop partition (p='b')");

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
        AcidUtils.baseFileFilter);
    if (1 != stat.length) {
      Assert.fail("Expecting 1 base and found " + Arrays.toString(stat));
    }
    String name = stat[0].getPath().getName();
    Assert.assertEquals("base_0000003", name);
    
    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
    Assert.assertEquals(2, r.size());
    Assert.assertTrue(isEqualCollection(r, asList("1\t2\ta", "3\t4\ta")));
    
    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
    
    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
    Assert.assertTrue(resp.getCompacts().stream().anyMatch(
        ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && "p=b".equals(ci.getPartitionname())));
    if (reCreate) {
      runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (3,3)");
    }
    runCleaner(hiveConf);
    
    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()),
        path -> path.getName().equals("p=b"));
    if ((reCreate ? 1 : 0) != stat.length) {
      Assert.fail("Partition data was " + (reCreate ? "" : "not") + " removed from FS");
    }
    if (reCreate) {
      r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
      Assert.assertEquals(3, r.size());
      Assert.assertTrue(isEqualCollection(r, asList("1\t2\ta", "3\t4\ta", "3\t3\tb")));
    }
  }

  @Test
  public void testDropWithBaseMultiplePartitions() throws Exception {
    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='a') values (1,1),(2,2)");
    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='b') values (3,3),(4,4)");
    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='c') values (7,7),(8,8)");
    
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true);
    runStatementOnDriver("alter table " + Table.ACIDTBLNESTEDPART + " drop partition (p2='a')");
    
    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
    Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
    
    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat;

    for (char p : asList('a', 'b')) {
      String partName = "p1=a/p2=a/p3=" + p;
      Assert.assertTrue(resp.getCompacts().stream().anyMatch(
          ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && partName.equals(ci.getPartitionname())));
      
      stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/" + partName),
          AcidUtils.baseFileFilter);
      if (1 != stat.length) {
        Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
      }
      String name = stat[0].getPath().getName();
      Assert.assertEquals("base_0000004", name);
    }
    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=b/p3=c"),
        AcidUtils.baseFileFilter);
    if (0 != stat.length) {
      Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
    }
    
    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLNESTEDPART);
    Assert.assertEquals(2, r.size());
    
    runCleaner(hiveConf);

    for (char p : asList('a', 'b')) {
      stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=a"),
          path -> path.getName().equals("p3=" + p));
      if (0 != stat.length) {
        Assert.fail("Partition data was not removed from FS");
      }
    }
  }
  
  @Test
  public void testDropDatabaseCascadePerTableNonBlocking() throws Exception {
    MetastoreConf.setLongVar(hiveConf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX, 1);
    dropDatabaseCascadeNonBlocking();
  }

  @Test
  public void testDropDatabaseCascadePerDbNonBlocking() throws Exception {
    dropDatabaseCascadeNonBlocking();
  }
  
  @Test
  public void testDropDatabaseCascadePerDbNonBlockingFilterTableNames() throws Exception {
    DummyMetaStoreFilterHookImpl.blockResults = true;
    dropDatabaseCascadeNonBlocking();
  }
  
  private void dropDatabaseCascadeNonBlocking() throws Exception {
    String database = "mydb";
    String tableName = "tab_acid";
    
    runStatementOnDriver("drop database if exists " + database + " cascade");
    runStatementOnDriver("create database " + database);
    
    // Create transactional table/materialized view with lockless-reads feature disabled
    runStatementOnDriver("create table " + database + "." + tableName + "1 (a int, b int) " +
      "partitioned by (ds string) stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into " + database + "." + tableName + "1 partition(ds) values(1,2,'foo'),(3,4,'bar')");

    runStatementOnDriver("create materialized view " + database + ".mv_" + tableName + "1 " +
      "partitioned on (ds) stored as orc TBLPROPERTIES ('transactional'='true')" +
      "as select a, ds from " + database + "." + tableName + "1 where b > 1");
    
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true);
    // Create transactional table/materialized view with lockless-reads feature enabled
    runStatementOnDriver("create table " + database + "." + tableName + "2 (a int, b int) " +
      "partitioned by (ds string) stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into " + database + "." + tableName + "2 partition(ds) values(1,2,'foo'),(3,4,'bar')");

    runStatementOnDriver("create materialized view " + database + ".mv_" + tableName + "2 " +
      "partitioned on (ds) stored as orc TBLPROPERTIES ('transactional'='true')" +
      "as select a, ds from " + database + "." + tableName + "2 where b > 1");

    // Create external partition data
    runStatementOnDriver("drop table if exists Tstage");
    runStatementOnDriver("create table Tstage (a int, b int) stored as orc" +
      " tblproperties('transactional'='false')");
    runStatementOnDriver("insert into Tstage values(0,2),(0,4)");
    
    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/2'");
    
    // Create external table
    runStatementOnDriver("create external table " + database + ".tab_ext (a int, b int) " +
      "partitioned by (ds string) stored as parquet");
    runStatementOnDriver("insert into " + database + ".tab_ext partition(ds) values(1,2,'foo'),(3,4,'bar')");
    // Add partition with external location
    runStatementOnDriver("alter table " + database + ".tab_ext add partition (ds='baz') location '" +getWarehouseDir() + "/1/data'");

    // Create managed table
    runStatementOnDriver("create table " + database + ".tab_nonacid (a int, b int) " +
      "partitioned by (ds string) stored as parquet");
    runStatementOnDriver("insert into " + database + ".tab_nonacid partition(ds) values(1,2,'foo'),(3,4,'bar')");
    // Add partition with external location
    runStatementOnDriver("alter table " + database + ".tab_nonacid add partition (ds='baz') location '" +getWarehouseDir() + "/2/data'");
   
    // Drop database cascade
    runStatementOnDriver("drop database " + database + " cascade");

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), database + ".db"),
      t -> t.getName().matches("(mv_)?" + tableName + "2" + SOFT_DELETE_TABLE_PATTERN));
    if (2 != stat.length) {
      Assert.fail("Table data was removed from FS");
    }
    stat = fs.listStatus(new Path(getWarehouseDir(), database + ".db"));
    Assert.assertEquals(2, stat.length);
    // External table under warehouse external location should be removed
    stat = fs.listStatus(new Path(getWarehouseDir(), "ext"));
    Assert.assertEquals(0, stat.length);
    // External partition for the external table should remain
    stat = fs.listStatus(new Path(getWarehouseDir(),"1"),
      t -> t.getName().equals("data"));
    Assert.assertEquals(1, stat.length);
    // External partition for managed table should be removed
    stat = fs.listStatus(new Path(getWarehouseDir(), "2"),
      t -> t.getName().equals("data"));
    Assert.assertEquals(0, stat.length);

    runCleaner(hiveConf);

    stat = fs.listStatus(new Path(getWarehouseDir(), database + ".db"),
      t -> t.getName().matches("(mv_)?" + tableName + "2" + SOFT_DELETE_TABLE_PATTERN));
    if (stat.length != 0) {
      Assert.fail("Table data was not removed from FS");
    }
  }
  
  @Test
  public void testDropTableWithSuffix() throws Exception {
    String tableName = "tab_acid";
    runStatementOnDriver("drop table if exists " + tableName);
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, true);

    runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)");
    runStatementOnDriver("drop table " + tableName);

    int count = TestTxnDbUtil.countQueryAgent(hiveConf, 
      "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'");
    Assert.assertEquals(1, count);
    
    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()),
      t -> t.getName().matches(tableName + SOFT_DELETE_TABLE_PATTERN));
    if (1 != stat.length) {
      Assert.fail("Table data was removed from FS");
    }
    MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
    houseKeeperService.setConf(hiveConf);
    
    houseKeeperService.run();
    count = TestTxnDbUtil.countQueryAgent(hiveConf,
      "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'");
    Assert.assertEquals(0, count);

    try {
      runStatementOnDriver("select * from " + tableName);
    } catch (Exception ex) {
      Assert.assertTrue(ex.getMessage().contains(
        ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(tableName, "'"))));
    }
    // Check status of compaction job
    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
    
    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
    Assert.assertEquals("Unexpected 0 compaction state",
      TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());

    runCleaner(hiveConf);
    
    FileStatus[] status = fs.listStatus(new Path(getWarehouseDir()),
      t -> t.getName().matches(tableName + SOFT_DELETE_TABLE_PATTERN));
    Assert.assertEquals(0, status.length);
  }

  @Test
  public void testDropTableWithoutSuffix() throws Exception {
    String tableName = "tab_acid";
    runStatementOnDriver("drop table if exists " + tableName);
    
    for (boolean enabled : asList(false, true)) {
      HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, enabled);
      runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
      runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)");
      
      HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, !enabled);
      runStatementOnDriver("drop table " + tableName);

      int count = TestTxnDbUtil.countQueryAgent(hiveConf,
        "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'");
      Assert.assertEquals(0, count);

      FileSystem fs = FileSystem.get(hiveConf);
      FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()),
        t -> t.getName().equals(tableName));
      Assert.assertEquals(0, stat.length);

      try {
        runStatementOnDriver("select * from " + tableName);
      } catch (Exception ex) {
        Assert.assertTrue(ex.getMessage().contains(
          ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(tableName, "'"))));
      }
      // Check status of compaction job
      TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
      ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
      Assert.assertEquals("Unexpected number of compactions in history", 0, resp.getCompactsSize());
    }
  }

  @Test
  public void testDropMaterializedViewWithSuffix() throws Exception {
    String tableName = "tab_acid";
    String mviewName = "mv_" + tableName;
    runStatementOnDriver("drop materialized view if exists " + mviewName);
    runStatementOnDriver("drop table if exists " + tableName);
    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, true);

    runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)");
    runStatementOnDriver("create materialized view " + mviewName + " stored as orc TBLPROPERTIES ('transactional'='true') " +
      "as select a from tab_acid where b > 1");
    runStatementOnDriver("drop materialized view " + mviewName);

    int count = TestTxnDbUtil.countQueryAgent(hiveConf,
      "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'");
    Assert.assertEquals(1, count);

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()),
      t -> t.getName().matches(mviewName + SOFT_DELETE_TABLE_PATTERN));
    if (1 != stat.length) {
      Assert.fail("Materialized view data was removed from FS");
    }
    MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
    houseKeeperService.setConf(hiveConf);

    houseKeeperService.run();
    count = TestTxnDbUtil.countQueryAgent(hiveConf,
      "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'");
    Assert.assertEquals(0, count);

    try {
      runStatementOnDriver("select * from " + mviewName);
    } catch (Exception ex) {
      Assert.assertTrue(ex.getMessage().contains(
        ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(mviewName, "'"))));
    }
    // Check status of compaction job
    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());

    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
    Assert.assertEquals("Unexpected 0 compaction state",
      TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());

    runCleaner(hiveConf);

    FileStatus[] status = fs.listStatus(new Path(getWarehouseDir()),
      t -> t.getName().matches(mviewName + SOFT_DELETE_TABLE_PATTERN));
    Assert.assertEquals(0, status.length);
  }

  @Test
  public void testDropMaterializedViewWithoutSuffix() throws Exception {
    String tableName = "tab_acid";
    String mviewName = "mv_" + tableName;
    runStatementOnDriver("drop materialized view if exists " + mviewName);

    for (boolean enabled : asList(false, true)) {
      runStatementOnDriver("drop table if exists " + tableName);
      HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, enabled);
      
      runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
      runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)");
      runStatementOnDriver("create materialized view " + mviewName + " stored as orc TBLPROPERTIES ('transactional'='true') " +
        "as select a from tab_acid where b > 1");

      HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, !enabled);
      runStatementOnDriver("drop materialized view " + mviewName);

      int count = TestTxnDbUtil.countQueryAgent(hiveConf,
        "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'");
      Assert.assertEquals(0, count);

      FileSystem fs = FileSystem.get(hiveConf);
      FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir()),
        t -> t.getName().equals(mviewName));
      Assert.assertEquals(0, stat.length);

      try {
        runStatementOnDriver("select * from " + mviewName);
      } catch (Exception ex) {
        Assert.assertTrue(ex.getMessage().contains(
          ErrorMsg.INVALID_TABLE.getMsg(StringUtils.wrap(mviewName, "'"))));
      }
      // Check status of compaction job
      TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
      ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
      Assert.assertEquals("Unexpected number of compactions in history", 0, resp.getCompactsSize());
    }
  }

  @Test
  public void testRenameMakeCopyPartition() throws Exception {
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='a') values (1,2),(3,4)");
    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (5,5),(4,4)");

    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY, true);
    runStatementOnDriver("alter table " + Table.ACIDTBLPART + " partition (p='b') rename to partition (p='c')");

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
      AcidUtils.baseFileFilter);
    if (1 != stat.length) {
      Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
    }
    String name = stat[0].getPath().getName();
    Assert.assertEquals("base_0000003", name);
    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
      AcidUtils.baseFileFilter);
    if (0 != stat.length) {
      Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
    }
    
    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART + " where p='b'");
    Assert.assertEquals(0, r.size());

    r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
    Assert.assertEquals(4, r.size());

    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());

    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
    Assert.assertTrue(resp.getCompacts().stream().anyMatch(
      ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && "p=b".equals(ci.getPartitionname())));

    runCleaner(hiveConf);

    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()),
      path -> path.getName().equals("p=b"));
    if (0 != stat.length) {
      Assert.fail("Expecting partition data to be removed from FS");
    }
  }

  @Test
  public void testRenameMakeCopyNestedPartition() throws Exception {
    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='c') values (1,1),(2,2)");
    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='d') values (3,3),(4,4)");

    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY, true);
    runStatementOnDriver("alter table " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='d')" +
      " rename to partition (p1='a', p2='c', p3='d')");

    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());

    FileSystem fs = FileSystem.get(hiveConf);
    FileStatus[] stat;
    
    String partName = "p1=a/p2=b/p3=d";
    Assert.assertTrue(resp.getCompacts().stream().anyMatch(
      ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && partName.equals(ci.getPartitionname())));

    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/" + partName),
      AcidUtils.baseFileFilter);
    if (1 != stat.length) {
      Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
    }
    String name = stat[0].getPath().getName();
    Assert.assertEquals("base_0000003", name);
    
    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=c/p3=d"),
      AcidUtils.baseFileFilter);
    if (0 != stat.length) {
      Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
    }

    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLNESTEDPART);
    Assert.assertEquals(4, r.size());

    runCleaner(hiveConf);
    
    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=b"),
      path -> path.getName().equals("p3=d"));
    if (0 != stat.length) {
      Assert.fail("Expecting partition data to be removed from FS");
    }
  }

  @Test
  public void testIsRawFormatFile() throws Exception {
    dropTables("file_formats");
    
    runStatementOnDriver("CREATE TABLE `file_formats`(`id` int, `name` string) " +
      " ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " +
      "WITH SERDEPROPERTIES ( " +
      " 'field.delim'='|', " +
      " 'line.delim'='\n'," +
      " 'serialization.format'='|')  " +
      "STORED AS " +
      " INPUTFORMAT " +
      "   'org.apache.hadoop.mapred.TextInputFormat' " +
      " OUTPUTFORMAT " +
      "   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " +
      "TBLPROPERTIES ( " +
      " 'transactional'='true'," +
      " 'transactional_properties'='insert_only')");
    
    runStatementOnDriver("insert into file_formats (id, name) values (1, 'Avro'),(2, 'Parquet'),(3, 'ORC')");
    
    List<String> res = runStatementOnDriver("select * from file_formats");
    Assert.assertEquals(3, res.size());
  }
  @Test
  public void testShowCompactions() throws Exception {
    //generate some compaction history
    runStatementOnDriver("drop database if exists mydb1 cascade");
    runStatementOnDriver("create database mydb1");
    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " +
      BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
      " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
      " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p2') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p3')  compact 'MAJOR' pool 'pool0'");
    TestTxnCommands2.runWorker(hiveConf);
    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);

    SessionState.get().setCurrentDatabase("mydb1");

    //testing show compaction command
    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
    List<String> r = runStatementOnDriver("SHOW COMPACTIONS");
    Assert.assertEquals(rsp.getCompacts().size()+1, r.size());//includes Header row

    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 STATUS 'ready for cleaning'");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getState().equals("ready for cleaning")).count() +1,
            r.size());//includes Header row
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
      "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
      "Highest WriteId", r.get(0));
    Pattern p = Pattern.compile(".*mydb1.*\tready for cleaning.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }
    r = runStatementOnDriver("SHOW COMPACTIONS COMPACTIONID=1");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getId()==1).count() +1,
            r.size());//includes Header row
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile("1\t.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 TYPE 'MAJOR' ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")).
      filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());//includes Header row
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
   "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
   "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb1.*\tMAJOR.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 POOL 'poolx' TYPE 'MINOR' ");
    //includes Header row
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")).
      filter(x->x.getPoolName().equals("poolx")).filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());
    Assert.assertEquals(1,r.size());//only header row

    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 POOL 'pool0' TYPE 'MAJOR'");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")).
      filter(x->x.getPoolName().equals("pool0")).filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());//includes Header row
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
    "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
    "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb1.*\tMAJOR.*\tpool0.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 POOL 'pool0'");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")).
      filter(x->x.getPoolName().equals("pool0")).count()+1, r.size());//includes Header row
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
    "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
    "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb1.*\tpool0.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    r = runStatementOnDriver("SHOW COMPACTIONS DATABASE mydb1 POOL 'pool0'");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")).
      filter(x->x.getPoolName().equals("pool0")).count()+1, r.size());//includes Header row
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb1.*\tpool0.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i).toString()).matches());
    }

    r = runStatementOnDriver("SHOW COMPACTIONS tbl0 TYPE 'MAJOR' ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getTablename().equals("tbl0")).
      filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());//includes Header row
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*tbl0.*\tMAJOR.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    r = runStatementOnDriver("SHOW COMPACTIONS mydb1.tbl0 PARTITION (p='p3') ");
    //includes Header row
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")).
      filter(x->x.getTablename().equals("tbl0")).filter(x->x.getPartitionname().equals("p=p3")).count() + 1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb1\ttbl0\tp=p3.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }
    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS mydb1.tbl0 PARTITION (p='p3') pool 'pool0' TYPE 'MAJOR'");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb1")).
      filter(x->x.getTablename().equals("tbl0")).filter(x->x.getPartitionname().equals("p=p3")).
      filter(x->x.getPoolName().equals("pool0")).filter(x->x.getType().equals(CompactionType.MAJOR)).count() + 1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb1\ttbl0\tp=p3\tMAJOR.*\tpool0.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

  }

  @Test
  public void testShowCompactionFilterWithPartition()throws Exception {
    setUpCompactionRequestsData("mydb","tbl2");
    executeCompactionRequest("mydb","tbl2", "MAJOR","ds='mon'");
    SessionState.get().setCurrentDatabase("mydb");

    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());

    //includes Header row
    List<String> r = runStatementOnDriver("SHOW COMPACTIONS");
    Assert.assertEquals(rsp.getCompacts().size()+1, r.size());

    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS tbl2 STATUS 'refused'");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getState().equals("refused")).count()+1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    Pattern p = Pattern.compile(".*tbl2.*\trefused.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS tbl2 ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getTablename().equals("tbl2")).count()+1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*tbl2.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS mydb.tbl2 PARTITION (ds='mon') ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")).
    filter(x->x.getTablename().equals("tbl2")).filter(x->x.getPartitionname().equals("ds=mon")).count()+1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb\ttbl2\tds=mon.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS mydb.tbl2 PARTITION (ds='mon') TYPE 'MAJOR' ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")).
            filter(x->x.getTablename().equals("tbl2")).filter(x->x.getPartitionname().equals("ds=mon")).
            filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb\ttbl2\tds=mon\tMAJOR.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS DATABASE mydb TYPE 'MAJOR' ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")).
            filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb.*\tMAJOR.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    executeCompactionRequest("mydb","tbl2", "MINOR","ds='wed'");

    rsp = txnHandler.showCompact(new ShowCompactRequest());
    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MINOR' ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")).
            filter(x->x.getType().equals(CompactionType.MINOR)).count()+1, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    p = Pattern.compile(".*mydb.*\tMINOR.*");
    for(int i = 1; i < r.size(); i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS  mydb.tbl2 PARTITION (ds='wed') ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mydb")).
    filter(x->x.getTablename().equals("tbl2")).filter(x->x.getPartitionname().equals("ds=wed")).count()+1, r.size());
    for(int i=1;i<r.size();i++) {
      Assert.assertTrue(r.get(i).contains("mydb"));
      Assert.assertTrue(r.get(i).contains("tbl2"));
      Assert.assertTrue(r.get(i).contains("ds=wed"));
    }

    r = runStatementOnDriver("SHOW COMPACTIONS tbl2 ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getTablename().equalsIgnoreCase("tbl2")).count()+1, r.size());
    for(int i=1;i<r.size();i++) {
      Assert.assertTrue(r.get(i).contains("tbl2"));
    }
    //includes Header row
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mymydbdb2 TYPE 'MAJOR' ");
    Assert.assertEquals(rsp.getCompacts().stream().filter(x->x.getDbname().equals("mymydbdb2")).
            filter(x->x.getType().equals(CompactionType.MAJOR)).count()+1, r.size());
    Assert.assertEquals(1,r.size());//only header row
  }
  @Test
  public void testShowCompactionInputValidation() throws Exception {
    setUpCompactionRequestsData("mydb2","tbl2");
    executeCompactionRequest("mydb2","tbl2", "MAJOR", "ds='mon'");
    SessionState.get().setCurrentDatabase("mydb2");

    //validation testing of paramters
    expectedException.expect(RuntimeException.class);
    List<String> r  = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb POOL 'pool0' TYPE 'MAJOR'");// validates db
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb2  TYPE 'MAJR'");// validates compaction type
    r = runStatementOnDriver("SHOW COMPACTIONS mydb2.tbl1 PARTITION (ds='mon') TYPE 'MINOR' " +
          "STATUS 'ready for clean'");// validates table
    r = runStatementOnDriver("SHOW COMPACTIONS mydb2.tbl2 PARTITION (p=101,day='Monday') POOL 'pool0' TYPE 'minor' " +
          "STATUS 'ready for clean'");// validates partspec
    r = runStatementOnDriver("SHOW COMPACTIONS mydb1.tbl0 PARTITION (p='p1') POOL 'pool0' TYPE 'minor' " +
          "STATUS 'ready for clean'");//validates compaction status
  }

  @Test
  public void testShowCompactionFilterSortingAndLimit() throws Exception {
    runStatementOnDriver("drop database if exists mydb1 cascade");
    runStatementOnDriver("create database mydb1");
    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " +
            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
            " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR' pool 'poolx'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);


    runStatementOnDriver("drop database if exists mydb cascade");
    runStatementOnDriver("create database mydb");
    runStatementOnDriver("create table mydb.tbl " + "(a int, b int) partitioned by (ds string) clustered by (a) into " +
            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into mydb.tbl" + " PARTITION(ds) " +
            " values(1,2,'mon'),(3,4,'tue'),(1,2,'mon'),(3,4,'tue'),(1,2,'wed'),(3,4,'wed')");
    runStatementOnDriver("alter table mydb.tbl" + " PARTITION(ds='mon') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb.tbl" + " PARTITION(ds='tue') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);

    runStatementOnDriver("create table mydb.tbl2 " + "(a int, b int) partitioned by (dm string) clustered by (a) into " +
            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into mydb.tbl2" + " PARTITION(dm) " +
            " values(1,2,'xxx'),(3,4,'xxx'),(1,2,'yyy'),(3,4,'yyy'),(1,2,'zzz'),(3,4,'zzz')");
    runStatementOnDriver("alter table mydb.tbl2" + " PARTITION(dm='yyy') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb.tbl2" + " PARTITION(dm='zzz') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);

    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());

    //includes Header row
    List<String> r = runStatementOnDriver("SHOW COMPACTIONS");
    Assert.assertEquals(rsp.getCompacts().size() + 1, r.size());
    r = runStatementOnDriver("SHOW COMPACTIONS LIMIT 3");
    Assert.assertEquals(4, r.size());
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' LIMIT 2");
    Assert.assertEquals(3, r.size());

    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY 'tabname' DESC,'partname' ASC");
    Assert.assertEquals(5, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    Pattern p = Pattern.compile(".*mydb\ttbl2\tdm.*");
    for (int i = 1; i < r.size() - 3; i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }
    p = Pattern.compile(".*mydb\ttbl\tds.*");
    for (int i = 3; i < r.size() - 1; i++) {
      Assert.assertTrue(p.matcher(r.get(i)).matches());
    }

    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 TYPE 'MAJOR' ORDER BY 'poolname' ASC");
    Assert.assertEquals(3, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    List<String> txnIdActualList = r.stream().skip(1).map(x -> x.split("\t")[15]).collect(Collectors.toList());
    List<String> txnIdExpectedList = r.stream().skip(1).map(x -> x.split("\t")[15]).sorted(Collections.reverseOrder()).
            collect(Collectors.toList());
    Assert.assertEquals(txnIdExpectedList, txnIdActualList);
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY 'txnid' DESC");
    Assert.assertEquals(5, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    txnIdActualList = r.stream().skip(1).map(x -> x.split("\t")[16]).collect(Collectors.toList());
    txnIdExpectedList = r.stream().skip(1).map(x -> x.split("\t")[16]).sorted(Collections.reverseOrder()).
            collect(Collectors.toList());
    Collections.sort(txnIdExpectedList, Collections.reverseOrder());
    Assert.assertEquals(txnIdExpectedList, txnIdActualList);

    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY TxnId DESC");
    Assert.assertEquals(5, r.size());
    Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time" +
            "\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\t" +
            "Highest WriteId", r.get(0));
    txnIdActualList = r.stream().skip(1).map(x -> x.split("\t")[16]).collect(Collectors.toList());
    txnIdExpectedList = r.stream().skip(1).map(x -> x.split("\t")[16]).sorted(Collections.reverseOrder()).
            collect(Collectors.toList());
    Assert.assertEquals(txnIdExpectedList, txnIdActualList);


    expectedException.expect(RuntimeException.class);
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY tbl DESC,PARTITIONS ASC");
    expectedException.expect(RuntimeException.class);
    r = runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb TYPE 'MAJOR' ORDER BY tbl DESC,PARTITIONS ASC");

  }

  @Test
  public void testAbortCompactions() throws Exception {
    //generate some compaction history
    runStatementOnDriver("drop database if exists mydb1 cascade");
    runStatementOnDriver("create database mydb1");
    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " +
            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
            " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR'");
    TestTxnCommands2.runInitiator(hiveConf);
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
            " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p2') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p3')  compact 'MAJOR' pool 'pool0'");
    TestTxnCommands2.runInitiator(hiveConf);
    TestTxnCommands2.runWorker(hiveConf);
    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);



    runStatementOnDriver("create table mydb1.tbl2 " + "(a int, b int) partitioned by (p string) clustered by (a) into " +
            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " +
            " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p1') compact 'MAJOR'");
    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p2') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);
    TestTxnCommands2.runCleaner(hiveConf);
    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p3') compact 'MAJOR'");
    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
            " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
    TestTxnCommands2.runWorker(hiveConf);
    TestTxnCommands2.runCleaner(hiveConf);
    runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " +
            " values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION (p='p1')  compact 'MINOR'");
    TestTxnCommands2.runWorker(hiveConf);

    runStatementOnDriver("create table mydb1.tbl3 " + "(a int, b int) partitioned by (ds string) clustered by (a) into " +
            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into mydb1.tbl3" + " PARTITION(ds) " +
            " values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
    runStatementOnDriver("alter table mydb1.tbl3" + " PARTITION(ds='today') compact 'MAJOR'");
    TestTxnCommands2.runWorker(hiveConf);

    SessionState.get().setCurrentDatabase("mydb1");

    //testing show compaction command

    List<String> r =  runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 STATUS 'initiated'");
    Assert.assertEquals(3,r.size());
    List<String>compIdsToAbort = r.stream().skip(1).map(x -> x.split("\t")[0]).collect(Collectors.toList());
    String abortCompactionCmd = "ABORT COMPACTIONS " +compIdsToAbort.get(0)+"\t"+compIdsToAbort.get(1);
    r = runStatementOnDriver(abortCompactionCmd);
    Assert.assertEquals(3,r.size());
    Assert.assertEquals("CompactionId\tStatus\tMessage", r.get(0));
    Assert.assertTrue(r.get(1).contains("Successfully aborted compaction"));
    Assert.assertTrue(r.get(2).contains("Successfully aborted compaction"));

    abortCompactionCmd = "ABORT COMPACTIONS " +compIdsToAbort.get(0)+"\t"+compIdsToAbort.get(1);
    r = runStatementOnDriver(abortCompactionCmd);
    Assert.assertEquals(3,r.size());
    Assert.assertEquals("CompactionId\tStatus\tMessage", r.get(0));
    Assert.assertTrue(r.get(1).contains("Error"));

    r =  runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 STATUS 'aborted'");
    Assert.assertEquals(3,r.size());

  }


  private void setUpCompactionRequestsData(String dbName, String tbName) throws Exception {
    runStatementOnDriver("drop database if exists " + dbName);
    runStatementOnDriver("create database " + dbName);
    runStatementOnDriver("create table " + dbName + "." + tbName + " (a int, b int) partitioned by (ds String)  stored as orc " +
       "TBLPROPERTIES ('transactional'='true')");
    runStatementOnDriver("insert into " + dbName + "." + tbName + " PARTITION (ds) " +
      " values(1,2,'mon'),(3,4,'mon'),(1,2,'tue'),(3,4,'tue'),(1,2,'wed'),(3,4,'wed')");
  }

  private void executeCompactionRequest(String dbName, String tbName, String compactiontype, String partition) throws Exception {
    runStatementOnDriver("alter table "+dbName+"."+tbName+" PARTITION (" +partition+") compact '"+compactiontype + "'" );
    TestTxnCommands2.runWorker(hiveConf);
  }

  @Test
  public void testFetchTaskCachingWithConversion() throws Exception {
    dropTables("fetch_task_table");
    List actualRes = new ArrayList<>();
    runStatementOnDriver("create table fetch_task_table (a INT, b INT) stored as orc" +
            " tblproperties ('transactional'='true')");
    runStatementOnDriver("insert into table fetch_task_table values (1,2), (3,4), (5,6)");
    List expectedRes = runStatementOnDriver("select * from fetch_task_table");

    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING, true);
    hiveConf.setVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "none");
    d.run("select * from fetch_task_table");
    Assert.assertFalse(d.getFetchTask().isCachingEnabled());
    d.getFetchTask().fetch(actualRes);
    Assert.assertEquals(actualRes, expectedRes);
    actualRes.clear();

    hiveConf.setVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "more");
    d.run("select * from fetch_task_table");
    Assert.assertTrue(d.getFetchTask().isCachingEnabled());
    d.getFetchTask().fetch(actualRes);
    Assert.assertEquals(actualRes, expectedRes);
  }
}
